# Utility tool to construct UNet
!pip install git+https://github.com/tensorflow/examples.git
# Module to get variable name as str
!pip3 install varname
!pip install -U tensorboard
# Load the TensorBoard notebook extension
%load_ext tensorboard
# Reload TensorBoard
# %reload_ext tensorboard
from datetime import datetime as dt
from requests import get
import os
from varname import argname
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder, RobustScaler
from sklearn.model_selection import train_test_split
from google.colab import drive
from scipy import io
from sklearn.model_selection import train_test_split
from tensorflow_examples.models.pix2pix import pix2pix
import cv2
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sklearn
import zipfile
The dataset is already split into train and test set. So our dataset preparation workflow for this notebook would be:
!wget no-check-certificate \
"https://github.com/SarahHannes/data-science-bowl-2018/raw/main/data/data-science-bowl-2018.zip" \
-O "/content/data-science-bowl-2018.zip"
# Extract dataset from zip to current session
zip_ref = zipfile.ZipFile('/content/data-science-bowl-2018.zip', 'r') # Opens the zip file in read mode
zip_ref.extractall('/content')
zip_ref.close()
IMG_SIZE = (128, 128)
train_images = []
train_masks = []
train_directory = '/content/data-science-bowl-2018-2/train'
# read all images and convert to RGB
image_dir = os.path.join(train_directory, 'inputs')
for image_file in sorted(os.listdir(image_dir)):
img = cv2.imread(os.path.join(image_dir, image_file))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, IMG_SIZE)
train_images.append(img)
print('Total train images:', len(train_images))
# read all masks and convert to grayscale
mask_dir = os.path.join(train_directory, 'masks')
for mask_file in sorted(os.listdir(mask_dir)):
mask = cv2.imread(os.path.join(mask_dir, mask_file), cv2.IMREAD_GRAYSCALE)
mask = cv2.resize(mask, IMG_SIZE)
train_masks.append(mask)
print('Total train masks:', len(train_masks))
test_images = []
test_masks = []
test_directory = '/content/data-science-bowl-2018-2/test'
# read all images and convert to RGB
image_dir = os.path.join(test_directory, 'inputs')
for image_file in sorted(os.listdir(image_dir)):
img = cv2.imread(os.path.join(image_dir, image_file))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, IMG_SIZE)
test_images.append(img)
print('Total test images:', len(test_images))
test_images[0].shape
# read all masks and convert to grayscale
mask_dir = os.path.join(test_directory, 'masks')
for mask_file in sorted(os.listdir(mask_dir)):
mask = cv2.imread(os.path.join(mask_dir, mask_file), cv2.IMREAD_GRAYSCALE)
mask = cv2.resize(mask, IMG_SIZE)
test_masks.append(mask)
print('Total train masks:', len(test_masks))
test_masks[0].shape
# Train set
train_images_np = np.array(train_images)
train_masks_np = np.array(train_masks)
# Test set
test_images_np = np.array(test_images)
test_masks_np = np.array(test_masks)
# Plot a few images from training set
plt.figure(figsize=(10,4))
for i in range(1, 4):
plt.subplot(1, 3, i)
img_plot = test_images[i]
plt.imshow(img_plot)
plt.axis('off')
plt.show()
plt.figure(figsize=(10,4))
for i in range(1, 4):
plt.subplot(1, 3, i)
mask_plot = test_masks[i]
plt.imshow(mask_plot, cmap='gray')
plt.axis('off')
plt.show()
# Expanding masks in the last axis
# Train set
train_mask_np_expand = np.expand_dims(train_masks_np, axis=-1)
print(train_masks[0].min(), train_masks[0].max())
# Test set
test_mask_np_expand = np.expand_dims(test_masks_np, axis=-1)
print(test_masks[0].min(), test_masks[0].max())
# Train set
train_converted_masks = np.round(train_mask_np_expand/ 255)
# To get background pixels as 1, and cells pixels as 0
train_converted_masks = 1 - train_converted_masks
# Test set
test_converted_masks = np.round(test_mask_np_expand/ 255)
# To get background pixels as 1, and cells pixels as 0
test_converted_masks = 1 - test_converted_masks
# Train set
train_converted_images = train_images_np/ 255.0
# Test set
test_converted_images = test_images_np/ 255.0
X_train, X_val, y_train, y_val = train_test_split(train_converted_images, train_converted_masks, test_size=0.2, random_state=0)
# Train set
X_train_ts = tf.data.Dataset.from_tensor_slices(X_train)
y_train_ts = tf.data.Dataset.from_tensor_slices(y_train)
# Validation set
X_val_ts = tf.data.Dataset.from_tensor_slices(X_val)
y_val_ts = tf.data.Dataset.from_tensor_slices(y_val)
# Test set
X_test_ts = tf.data.Dataset.from_tensor_slices(test_converted_images)
y_test_ts = tf.data.Dataset.from_tensor_slices(test_converted_masks)
so that each sets contains both the images and the masks together
# zip so that the dataset contains the feature and label together
train = tf.data.Dataset.zip((X_train_ts, y_train_ts))
val = tf.data.Dataset.zip((X_val_ts, y_val_ts))
test = tf.data.Dataset.zip((X_test_ts, y_test_ts))
print('Lenth of train', len(X_train_ts))
print('Lenth of val',len(X_val_ts))
print('Lenth of test',len(X_test_ts))
BATCH_SIZE = 16
AUTOTUNE = tf.data.AUTOTUNE
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = len(X_train_ts)//BATCH_SIZE
VALIDATION_STEPS = len(X_test_ts)//BATCH_SIZE
train = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train = train.prefetch(buffer_size = AUTOTUNE)
val = val.batch(BATCH_SIZE).repeat()
val = val.prefetch(buffer_size = AUTOTUNE)
test = test.batch(BATCH_SIZE).prefetch(buffer_size = AUTOTUNE)
print('shape of prefetch dataset: ')
print('Input image shape:', train.element_spec[0])
print('Output mask shape:', train.element_spec[1])
base_model = tf.keras.applications.MobileNetV2(input_shape=[128,128,3],include_top=False)
# Use the activations of those layers
layer_names = [
'block_1_expand_relu', # 64x64
'block_3_expand_relu', # 32x32
'block_6_expand_relu', # 16x16
'block_13_expand_relu', # 8x8
'block_16_project', # 4x4
]
# To list all layers and the output shapes
base_model.summary()
# Plot base model
tf.keras.utils.plot_model(base_model, show_shapes=True)
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]
# Create the feature extraction model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)
# Freeze the downsampling layers
down_stack.trainable = False
def unet_model(output_channels):
"""
Create modified uNet.
"""
inputs = tf.keras.layers.Input(shape=[IMG_SIZE[0], IMG_SIZE[1], 3])
# Downsampling through the model
skips = down_stack(inputs)
x = skips[-1]
skips = reversed(skips[:-1])
# Upsampling and establishing the skip connections
for up, skip in zip(up_stack, skips):
x = up(x)
concat = tf.keras.layers.Concatenate()
x = concat([x, skip])
# This is the last layer of the model
last = tf.keras.layers.Conv2DTranspose(
filters=output_channels, kernel_size=3, strides=2,
padding='same') #64x64 -> 128x128
x = last(x)
return tf.keras.Model(inputs=inputs, outputs=x)
up_stack = [
pix2pix.upsample(512, 3), # 4x4 -> 8x8
pix2pix.upsample(256, 3), # 8x8 -> 16x16
pix2pix.upsample(128, 3), # 16x16 -> 32x32
pix2pix.upsample(64, 3), # 32x32 -> 64x64
]
OUTPUT_CLASSESS = 2 # one hot encoding output
model = unet_model(output_channels=OUTPUT_CLASSESS)
model.summary()
tf.keras.utils.plot_model(model, show_shapes=True)
def display(display_list):
"""
Display input image, true mask and predicted mask.
"""
plt.figure(figsize=(15,15))
title = ['Input Image', 'True Mask', 'Predicted Mask']
for i in range(len(display_list)):
plt.subplot(1, len(display_list), i+1)
plt.title(title[i])
plt.imshow(tf.keras.utils.array_to_img(display_list[i]))
plt.axis('off')
plt.show()
def create_mask(pred_mask):
"""
Process predicted mask.
"""
# the pred_mask will only be 1 channel because of axis argument
pred_mask = tf.argmax(pred_mask, axis=-1)
pred_mask = pred_mask[..., tf.newaxis]
return pred_mask
def show_predictions(dataset=None, total_batch=1):
"""
Show the predicted result along with its input image and true mask.
"""
if dataset:
for image, mask in dataset.take(total_batch):
pred_mask = model.predict(image)
display([image[0], mask[0], create_mask(pred_mask)[0]])
else:
for images, masks in train.take(2):
sample_image, sample_mask = images[0], masks[0]
display([sample_image, sample_mask, create_mask(model.predict(sample_image[tf.newaxis, ...]))[0]])
class DisplayCallback(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
"""
Custom callback to display result during training.
"""
# clear_output(wait=True)
show_predictions()
print('\n Sample prediction after epoch {}\n'.format(epoch+1))
# Get the current filename of the colab notebook
notebook_filename = get('http://172.28.0.2:9000/api/sessions').json()[0]['name']
notebook_name = notebook_filename.split('.')[0]
print('notebook_filename:', notebook_filename)
print('notebook_name:', notebook_name)
print()
# Create path to store the checkpoints from this notebook
gdrive_root_dir = '/content/drive/MyDrive/mida_deep_learning/checkpoints'
checkpoint_dir = os.path.join(gdrive_root_dir, notebook_name)
print('Root directory in google drive:', gdrive_root_dir)
print('Folder to save checkpoints in:', checkpoint_dir)
def train_model(model, epochs=20):
"""
Train model.
"""
# need to output logits since the output layer does not have any activation
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
# Initialize checkpoint
# Create folder if not yet exist
os.makedirs(checkpoint_dir, exist_ok=True)
filepath = os.path.join(checkpoint_dir, argname('model'), "weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
# Initialize earlystopping
earlystopping_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=2)
# Initialize tensorboard
logdir = os.path.join("logs", argname('model'))
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)
return model.fit(train,
validation_data=val,
epochs=epochs,
steps_per_epoch=STEPS_PER_EPOCH,
validation_steps=VALIDATION_STEPS,
callbacks=[DisplayCallback(), checkpoint_callback, tensorboard_callback, earlystopping_callback])
# Display some examples before training
for images, masks in train.take(2):
sample_image, sample_mask = images[0], masks[0]
display([sample_image, sample_mask])
tf.keras.backend.clear_session()
history = train_model(model, 100)
%tensorboard --logdir logs
# Upload an experiment to tensorboard dev
! tensorboard dev upload --logdir /content/logs \
--name "Data Science Bowl 2018_MobileNetv2" \
--description "MobileNetv2 as base model for image segmentation"
Show prediction on test set.
show_predictions(test, 5)